{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "1041ae6f",
   "metadata": {},
   "source": [
    "![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "247fb2ab",
   "metadata": {},
   "source": [
    "### [Integrated Audits: Streamlined Data Observability with Apache Iceberg](https://tabular.io/blog/integrated-audits/)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fd61c16f",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "747bee98",
   "metadata": {},
   "source": [
    "To be able to rerun the notebook several times, let's drop the `permits` table if it exists to start fresh."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "26245f7e",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DROP TABLE IF EXISTS nyc.permits"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "eead44c0",
   "metadata": {},
   "source": [
    "# Load NYC Film Permits Data"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6f9a9f41",
   "metadata": {},
   "source": [
    "For this demo, we will use the [New York City Film Permits dataset](https://data.cityofnewyork.us/City-Government/Film-Permits/tg4x-b46p) available as part of the NYC Open Data initiative. We're using a locally saved copy of a 1000 record sample, but feel free to download the entire dataset to use in this notebook!\n",
    "\n",
    "We'll save the sample dataset into an iceberg table called `permits`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e3cc669a",
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "df = spark.read.option(\"inferSchema\",\"true\").option(\"multiline\",\"true\").json(\"/home/iceberg/data/nyc_film_permits.json\")\n",
    "df.write.saveAsTable(\"nyc.permits\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "378cf187",
   "metadata": {},
   "source": [
    "Taking a quick peek at the data, you can see that there are a number of permits for different boroughs in New York."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f3170161",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.read \\\n",
    "    .format(\"iceberg\") \\\n",
    "    .load(\"nyc.permits\") \\\n",
    "    .groupBy(\"borough\") \\\n",
    "    .count() \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c85a71a2",
   "metadata": {},
   "source": [
    "# Generate an ID for an Integrated Audit Session"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "182510da",
   "metadata": {},
   "source": [
    "An integrated audit session is a single cadence of:\n",
    "1. Staging changes to a table\n",
    "2. Auditing the staged changes\n",
    "3. Committing the changes (optional)\n",
    "\n",
    "Each of these sessions must be represented with an ID. You can use any convention that makes sense in your environment but in this demo we'll simply use a UUID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0e39d3d1",
   "metadata": {},
   "outputs": [],
   "source": [
    "import uuid\n",
    "ia_session_id = uuid.uuid4().hex\n",
    "ia_session_id"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fa31a9ea",
   "metadata": {},
   "source": [
    "# The Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d845953b",
   "metadata": {},
   "source": [
    "Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the `write.wap.enabled` table metadata property to `true`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bf29df0b",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "ALTER TABLE nyc.permits\n",
    "SET TBLPROPERTIES (\n",
    "    'write.wap.enabled'='true'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1dc5ad69",
   "metadata": {},
   "source": [
    "Next, the `spark.wap.id` property of your Spark session configuration must be set to the integrated audit session ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "65bc4280",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.conf.set('spark.wap.id', ia_session_id)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3a34995b",
   "metadata": {},
   "source": [
    "With a `spark.wap.id` value set, you can now safely write **directly to the permits table**--don't worry, these changes will only be staged, not committed!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "437088f6",
   "metadata": {},
   "source": [
    "# Staging The Changes"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1c9fa6e9",
   "metadata": {},
   "source": [
    "To stage the changes, you simply write directly to the `permits` table. This is awesome in situations where you're working with a large and complex data ingestion pipeline.\n",
    "Instead of including hard-coded logic in your pipeline to switch between a sort of \"audit-mode\" as opposed to \"production-mode\", with integrated audits you simple run your\n",
    "production code!\n",
    "\n",
    "For this demo, let's use a simple query that deletes all records for film permits in the manhattan borough."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "14843243",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "DELETE FROM nyc.permits\n",
    "WHERE borough='Manhattan'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "56cc8478",
   "metadata": {},
   "source": [
    "As described, even though the query was executed against the production table, these changes are only staged and not committed since we are within an integrated audit session. Let's confirm this by verifying that a count by borough still includes the Manhattan records."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "95df15e9",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT borough, count(*) permit_cnt\n",
    "FROM nyc.permits\n",
    "GROUP BY borough"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "3fe2c863",
   "metadata": {},
   "source": [
    "# The Audit"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a7935b0d",
   "metadata": {},
   "source": [
    "Once the changes for this session are staged, you can perform all of your audits to validate the data. The first step is to retrieve the snapshot ID generated by the changes and tagged with this integrated audit session ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "95d71430",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "SELECT snapshot_id\n",
    "FROM nyc.permits.snapshots\n",
    "WHERE summary['wap.id'] = '{ia_session_id}'\n",
    "\"\"\"\n",
    "ia_session_snapshot_records = %sql $query\n",
    "ia_session_snapshot = ia_session_snapshot_records.rows[0][0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1035b246",
   "metadata": {},
   "outputs": [],
   "source": [
    "ia_session_snapshot"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4c602800",
   "metadata": {},
   "source": [
    "This snapshot includes the staged (but not commited) changes to your production table. Once you have this snapshot ID, you can use Iceberg's Time Travel feature to query it!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c95130e9",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.read \\\n",
    "    .option(\"snapshot-id\", ia_session_snapshot) \\\n",
    "    .format(\"iceberg\") \\\n",
    "    .load(\"nyc.permits\") \\\n",
    "    .groupBy(\"borough\") \\\n",
    "    .count() \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0cab3813",
   "metadata": {},
   "source": [
    "At this point, you can use any auditing tool or technique to validate your changes. For this demo, we'll do a simple audit that confirms that the only remaining boroughs are Queens, Brooklyn, Bronx, and Staten Island. If either borough is missing or any additional boroughs are found, we'll raise an exception."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "82af5de3",
   "metadata": {},
   "outputs": [],
   "source": [
    "expected_boroughs = {\"Queens\", \"Brooklyn\", \"Bronx\", \"Staten Island\"}\n",
    "distinct_boroughs = spark.read \\\n",
    "    .option(\"snapshot-id\", ia_session_snapshot) \\\n",
    "    .format(\"iceberg\") \\\n",
    "    .load(\"nyc.permits\") \\\n",
    "    .select(\"borough\") \\\n",
    "    .distinct() \\\n",
    "    .toLocalIterator()\n",
    "boroughs = {row[0] for row in distinct_boroughs}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a4fad7c2",
   "metadata": {},
   "outputs": [],
   "source": [
    "# Since `boroughs` and `required_boroughs` are both sets (array of distinct items),\n",
    "# we can confirm that they match by checking that the lengths of the sets are equal\n",
    "# to eachother as well as to the union of both sets.\n",
    "if len(boroughs) != len(expected_boroughs) != len(set.union(boroughs, expected_boroughs)):\n",
    "    raise ValueError(f\"Audit failed, borough set does not match expected boroughs: {boroughs} != {expected_boroughs}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b1032255",
   "metadata": {},
   "source": [
    "If the above check does not fail, we can go ahead and commit our staged data to publish our changes!"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "2079435b",
   "metadata": {},
   "source": [
    "# The Publish"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "88d59f50",
   "metadata": {},
   "source": [
    "After the audits are completed, publishing the data is as simple as running a `cherrypick_snapshot` stored procedure."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "056236ba",
   "metadata": {},
   "outputs": [],
   "source": [
    "publish_query = f\"CALL demo.system.cherrypick_snapshot('nyc.permits', {ia_session_snapshot})\"\n",
    "%sql $publish_query"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "17b868e8",
   "metadata": {},
   "source": [
    "That's it! Publishing the changes from this integrated audit session is a simple metadata-only operation that instantly makes the changes live for all downstream consumers querying the `permits` table! Query results will now include the commit that removed all Manhattan records."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "930682ce",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.read \\\n",
    "    .format(\"iceberg\") \\\n",
    "    .load(\"nyc.permits\") \\\n",
    "    .groupBy(\"borough\") \\\n",
    "    .count() \\\n",
    "    .show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d941b990",
   "metadata": {},
   "source": [
    "# What Happens When The Audits Fail?"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f6b4084e",
   "metadata": {},
   "source": [
    "What about when your audits fail? What happens to the snapshots generated? How about the data and metadata files?\n",
    "\n",
    "One of the best parts of Iceberg's integrated audits is that the cleanup of \"*staged-yet-not-committed-data*\" is part of the normal snapshot cleanup process of a typical Iceberg warehouse. To be more specific, let's say a daily snapshot expiration is performed on the data warehouse (using the [expire_snapshots](https://iceberg.apache.org/docs/latest/spark-procedures/#expire_snapshots) procedure) and all snapshots older than 7 days are expired. That means once your staged snapshot reaches 7 days in age, it will be expired.\n",
    "\n",
    "Additionally, since the changes were never committed, the underlying data files for the snapshot will be removed since they're not referenced by any other snapshots in the linear history of the table.\n",
    "\n",
    "Let's see this in action. First, start a new integrated audit session and stage a commit by inserting a single record."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "25eff1ef",
   "metadata": {},
   "outputs": [],
   "source": [
    "ia_session_id = uuid.uuid4().hex\n",
    "ia_session_id"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4726b169",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.conf.set('spark.wap.id', ia_session_id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "31bf19f1",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "INSERT INTO nyc.permits\n",
    "VALUES (\n",
    "    'Hoboken',\n",
    "    'Television',\n",
    "    '1',\n",
    "    'United States of America',\n",
    "    '2021-11-24T23:00:00.000',\n",
    "    '2021-11-23T09:38:17.000',\n",
    "    'Mayor\\'s Office of Film, Theatre & Broadcasting',\n",
    "    '613322',\n",
    "    'Shooting Permit',\n",
    "    'WASHINGTON STREET',\n",
    "    '100',\n",
    "    '2021-11-24T07:00:00.000',\n",
    "    'Episodic series',\n",
    "    '07030'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "aa29184b",
   "metadata": {},
   "source": [
    "Next, let's identify the snapshot that was tagged with the integrated audit session ID."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "682a5f52",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT snapshot_id\n",
    "FROM nyc.permits.snapshots"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ef4dd148",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "SELECT snapshot_id\n",
    "FROM nyc.permits.snapshots\n",
    "WHERE summary['wap.id'] = '{ia_session_id}'\n",
    "\"\"\"\n",
    "ia_session_snapshot_records = %sql $query\n",
    "ia_session_snapshot = ia_session_snapshot_records.rows[0][0]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "62f52c08",
   "metadata": {},
   "outputs": [],
   "source": [
    "ia_session_snapshot"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "60561dff",
   "metadata": {},
   "source": [
    "A quick check of the history table shows that this snapshot is not included as part of the current history of the table since it has not been published yet."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ec96a9c0",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT *\n",
    "FROM nyc.permits.history"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6ec54f3a",
   "metadata": {},
   "source": [
    "In a scenario where the audits fail and this change is not published, the `expire_snapshots` procedure will clean up the snapshot **and** the data files. Let's demonstrate this by calling the `expire_snapshots` procedure for all snapshots older than the current timestamp."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4727c61e",
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "%sql CALL demo.system.expire_snapshots('nyc.permits', {round(time.time() * 1000)}, 100)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c8e47351",
   "metadata": {},
   "source": [
    "The output from the `expire_snapshots` procedure shows that a data file, a manifest file, and a manifest list file were deleted. Furthermore, the snapshot no longer appears in the permit table's snapshots table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "53f53072",
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "SELECT *\n",
    "FROM nyc.permits.snapshots"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
